package net.dwade.livechat.websocket.client;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.CookieStore;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.cookie.Cookie;
import org.apache.http.impl.client.BasicCookieStore;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.cookie.BasicClientCookie;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.UndertowXhrTransport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import org.springframework.web.socket.sockjs.frame.Jackson2SockJsMessageCodec;
import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
import org.xnio.OptionMap;
import org.xnio.Options;

import com.alibaba.fastjson.JSONObject;

import net.dwade.livechat.websocket.beans.SubscribeType;
import net.dwade.livechat.websocket.beans.WebSocketRequestMessage;

/**
 * Websocket访客端 
 * @author huangxf
 * @date 2016年12月26日
 */
public class ChatWebsocketClient implements ChatClient {
	
	/** http请求头User-Agent */
	private String userAgent;
	
	/** index请求的URL地址 */
	private String indexUrl;
	
	/** pull请求的URL地址 */
	private String pullUrl;
	
	/** 访问websocket的URL地址 */
	private String chatUrl;
	
	/** 域名，没有域名的用主机ip代替 */
	private String domain;
	
	/** Http请求的SessionId */
	private String httpSessionId;
	
	public static final String STATUS_WATING = "0";
	
	public static final String STATUS_ACCEPTED = "1";
	
	public static final String SUCCESS = "000000";
	
	private static final Logger logger = LoggerFactory.getLogger( ChatWebsocketClient.class );
	
	/**
	 * 如果创建多个实例，线程会出现Blocked，因为会去加载类
	 * @see {@link Jackson2SockJsMessageCodec#Jackson2SockJsMessageCodec()}
	 * @see {@link Jackson2ObjectMapperBuilder#build()}
	 */
	private static final SockJsMessageCodec MESSAGE_CODEC = new Jackson2SockJsMessageCodec();
	
	private static final ThreadPoolTaskScheduler taskScheduler;
	
	private final SockJsClient sockJsClient;
	
	private CookieStore cookieStore = null;
	
	private HttpClientContext context = null;
	
	/** 排除轮询次数 */
	private AtomicInteger pullCount = new AtomicInteger();
	
	/** 发送消息次数统计 */
	private AtomicInteger sendCount = new AtomicInteger();
	
	/** 最多排除轮询次数 */
	public static final int MAX_PULL = 30;
	
	/**
	 * 会话ID
	 */
	private String chatSessionId;
	
	protected WebSocketStompClient stompClient;
	
	protected StompSession stompSession;
	
	//初始化任务调度器，用于保持websocket心跳
	static {
		ThreadPoolTaskScheduler theadPoolScheduler = new ThreadPoolTaskScheduler();
		theadPoolScheduler.setPoolSize( 100 );
		theadPoolScheduler.setThreadNamePrefix( "WebsocketHeartBeat-" );
		theadPoolScheduler.initialize();
		taskScheduler = theadPoolScheduler;
	}
	
	public ChatWebsocketClient(String userAgent, String indexUrl, String pullUrl, 
			String chatUrl, String domain, String httpSessionId) {
		
		this.userAgent = userAgent;
		this.indexUrl = indexUrl;
		this.pullUrl = pullUrl;
		this.chatUrl = chatUrl;
		this.domain = domain;
		this.httpSessionId = httpSessionId;
		
		//初始化通道以及SockJsClient
		Transport webSocketTransport = new WebSocketTransport( new StandardWebSocketClient() );
		List<Transport> transports = Collections.singletonList( webSocketTransport );

		this.sockJsClient = new SockJsClient( transports );
		sockJsClient.setMessageCodec( MESSAGE_CODEC );
		
		this.postConstruct();
	}
	
	protected void postConstruct() {
		
		cookieStore = new BasicCookieStore();
		context = HttpClientContext.create();
		context.setCookieStore( cookieStore );
		
		BasicClientCookie cookie = new BasicClientCookie( "SESSION", this.httpSessionId );
		//需要设置域名，否则不能将cookie传递到服务端
		cookie.setDomain( this.domain );
		cookieStore.addCookie( cookie );
		
	}

	/**
	 * index请求
	 */
	public void index() {
		
		CloseableHttpClient client = null;
		CloseableHttpResponse response = null;
		
		try {
			
			client = HttpClients.custom().setDefaultCookieStore( cookieStore ).build();
			HttpGet get = new HttpGet( indexUrl );
			get.setHeader( "User-Agent", userAgent );
			
			//传入context对象，便于保存cookies
			response = client.execute( get, context );
			
			int status = response.getStatusLine().getStatusCode();
			if ( status < 200 && status > 300 ) {
				throw new RuntimeException( "index请求错误, http status:" + status );
			}
			
			HttpEntity entity = response.getEntity();
			String json = EntityUtils.toString( entity, "UTF-8" );
			logger.info( "index response:{}", json );
			
			String retCode = JSONObject.parseObject( json ).getString( "retCode" );
			if ( !SUCCESS.equals( retCode ) ) {
				throw new RuntimeException( "index请求错误, json result:" + json );
			}
			
		} catch( Exception e ) {
			throw new RuntimeException( "index response error", e );
		} finally {
			IOUtils.closeQuietly( response );
			IOUtils.closeQuietly( client );
		}
	}
	
	/**
	 * 排队轮询
	 */
	public void clientPull() {
		
		CloseableHttpClient client = null;
		CloseableHttpResponse response = null;
		try {
			
			client = HttpClients.custom().setDefaultCookieStore( cookieStore ).build();
			HttpPost method = new HttpPost( pullUrl );
			method.setHeader( "User-Agent", userAgent );
			
			//传入context对象，便于保存cookies
			response = client.execute( method, context );
			HttpEntity entity = response.getEntity();
			String json = EntityUtils.toString( entity, "UTF-8" );
			logger.info( "Client pull:{}", json );
			
			//处理pull结果
			JSONObject result = JSONObject.parseObject( json );
			String status = result.getString( "status" );
			
			//排队成功，初始化会话数据
			if ( STATUS_ACCEPTED.equals( status ) ) {
				chatSessionId = result.getString( "chatSessionId" );
			}
			else if ( STATUS_WATING.equals( status ) ) {
				//继续排队
				int times = pullCount.incrementAndGet();
				if ( times > MAX_PULL ) {
					throw new RuntimeException( "轮询排队超过上限" );
				}
				Thread.sleep( 3000 );
				clientPull();
			} 
			else {
				throw new RuntimeException( "client pull error:" + json );
			}
			
		} catch( Exception e ) {
			throw new RuntimeException( "client pull response error", e );
		} finally {
			IOUtils.closeQuietly( response );
			IOUtils.closeQuietly( client );
		}
	}
	
	/**
	 * Websocket连接，<strong>注意：订阅阶段是异步的，需要根据实际情况进行处理</strong>
	 */
	public void connect() {
		try {
			standardWebsocket();
		} catch (Exception e) {
			throw new RuntimeException( "连接失败", e );
		}
	}
	
	public void disconnect() {
		//释放资源
		if ( stompClient != null ) {
			stompClient.stop();
		}
		if ( stompSession != null ) {
			stompSession.disconnect();
		}
	}
	
	public WebSocketRequestMessage sendMessage( String text ) {
		final WebSocketRequestMessage msg = new WebSocketRequestMessage();
		msg.setWebsocketRequestId( this.stompSession.getSessionId() + "-" + sendCount.incrementAndGet() );
		msg.setContentType( "0" );
		msg.setMessageType( "0" );
		msg.setPostTime( new Date() );
		msg.setMessage( text );
		try {
			this.beforeSendMessage( msg );
			byte[] data = JSONObject.toJSONString( msg ).getBytes( "UTF-8" );
			stompSession.send( "/app/sendMessage", data );
		} catch (UnsupportedEncodingException e) {
			logger.error( "消息编码异常", e );
		}
		return msg;
	}
	
	/**
	 * 供子类扩展
	 * @param message
	 * @return void
	 */
	protected void beforeSendMessage( WebSocketRequestMessage message ) { }
	
	/**
	 * 订阅消息的回调方法
	 */
	protected void subscribeCallback( StompSession session, SubscribeType type, String textMessage ) {
		logger.debug( "系统消息已订阅成功" );
	}
	
	/**
	 * Websocket连接已建立，但是未订阅成功
	 */
	protected void afterConnected( StompSession session ) {
		logger.debug( "Websocket连接成功" );
	}
	
	protected void standardWebsocket() throws Exception {
		
		//主要目的是设置Cookie请求头，注意格式，Cookie: SESSION=bbc43bd3-b38c-40d0-bf53-ad9967a11254
		HttpHeaders httpHeaders = new HttpHeaders();
		httpHeaders.set( HttpHeaders.COOKIE, "SESSION=" + this.getHttpSessionId() );
		WebSocketHttpHeaders wsHeaders = new WebSocketHttpHeaders( httpHeaders );
		logger.info( "WebSocketHttpHeaders:{}", wsHeaders );

		this.stompClient = new WebSocketStompClient( sockJsClient );
		stompClient.setTaskScheduler( taskScheduler );
		ListenableFuture<StompSession> future = stompClient.connect( chatUrl, wsHeaders, new SimpleStompSessionHandler() );
		
		//阻塞连接
		StompSession session = future.get();
		
		subscribe( session );
		
		this.stompSession = session;
		afterConnected( session );
	}
	
	/**
	 * 通过UndertowXhrTransport实现双向通信，模拟浏览器使用xhrStreaming通讯
	 * UndertowXhrTransport实例需要设置OptionMap
	 * https://fossies.org/linux/spring-framework/docs/javadoc-api/org/springframework/web/socket/sockjs/client/UndertowXhrTransport.html
	 * @see UndertowXhrTransport#UndertowXhrTransport(OptionMap)
	 */
	protected void xhrStreamWebsocket() throws Exception {
		
		//主要目的是设置Cookie请求头，注意格式，Cookie: SESSION=bbc43bd3-b38c-40d0-bf53-ad9967a11254
		HttpHeaders httpHeaders = new HttpHeaders();
		httpHeaders.set( HttpHeaders.COOKIE, "SESSION=" + this.getHttpSessionId() );
		WebSocketHttpHeaders wsHeaders = new WebSocketHttpHeaders( httpHeaders );
		System.out.println( wsHeaders );
		
		OptionMap optionMap = OptionMap.builder()
				   .set(Options.WORKER_IO_THREADS, 1)
				   .set(Options.TCP_NODELAY, true)
				   .set(Options.KEEP_ALIVE, true)
				   .set(Options.WORKER_NAME, "SockJSClient")
				   .getMap();
		Transport xhrTransport = new UndertowXhrTransport( optionMap );
		List<Transport> transports = Collections.singletonList( xhrTransport );

		SockJsClient sockJsClient = new SockJsClient( transports );
		sockJsClient.setMessageCodec( new Jackson2SockJsMessageCodec() );

		this.stompClient = new WebSocketStompClient( sockJsClient );
		ListenableFuture<StompSession> future = stompClient.connect( chatUrl, wsHeaders, new SimpleStompSessionHandler() );
		StompSession session = future.get();
		
		this.stompSession = session;
		
		subscribe( session );
		
	}

	protected void subscribe( final StompSession session ) {
		
		session.subscribe( SubscribeType.CHAT_MESSAGE.destination, new StompFrameHandler() {
			@Override
			public void handleFrame(StompHeaders headers, Object payload) {
				String result = new String( ( byte[] ) payload );
				subscribeCallback( session, SubscribeType.CHAT_MESSAGE, result );
			}
			@Override
			public Type getPayloadType(StompHeaders headers) {
				return byte[].class;
			}
		});

		session.subscribe( SubscribeType.CHAT_RESPONSE.destination, new StompFrameHandler() {
			@Override
			public void handleFrame(StompHeaders headers, Object payload) {
				String result = new String( ( byte[] ) payload );
				subscribeCallback( session, SubscribeType.CHAT_RESPONSE, result );
			}
			@Override
			public Type getPayloadType(StompHeaders headers) {
				return byte[].class;
			}
		});

		session.subscribe( SubscribeType.SYSTEM_CLOSE.destination, new StompFrameHandler() {
			@Override
			public void handleFrame(StompHeaders headers, Object payload) {
				String result = new String( ( byte[] ) payload );
				subscribeCallback( session, SubscribeType.SYSTEM_CLOSE, result );
			}
			@Override
			public Type getPayloadType(StompHeaders headers) {
				return byte[].class;
			}
		});
		
		//订阅系统消息
		session.subscribe( SubscribeType.SYSTEM_MESSAGE.destination, new StompFrameHandler() {
			@Override
			public void handleFrame(StompHeaders headers, Object payload) {
				String result = new String( ( byte[] ) payload );
				logger.info( "Subscribe system message:", result );
				subscribeCallback( session, SubscribeType.SYSTEM_MESSAGE, result );
			}
			@Override
			public Type getPayloadType(StompHeaders headers) {
				return byte[].class;
			}
		});

	}
	
	private String getHttpSessionId() {
		if ( StringUtils.isNotBlank( this.httpSessionId ) ) {
			return this.httpSessionId;
		}
		List<Cookie> cookies = cookieStore.getCookies();
		for ( Cookie cookie : cookies ) {
			if ( "SESSION".equals( cookie.getName() ) ) {
				return cookie.getValue();
			}
		}
		return null;
	}

	public String getUserAgent() {
		return userAgent;
	}

	public String getIndexUrl() {
		return indexUrl;
	}

	public String getPullUrl() {
		return pullUrl;
	}

	public String getChatUrl() {
		return chatUrl;
	}

	public String getDomain() {
		return domain;
	}

	public String getChatSessionId() {
		return chatSessionId;
	}

	@Override
	public String toString() {
		return "ChatWebsocketClient [userAgent=" + userAgent + ", indexUrl="
				+ indexUrl + ", pullUrl=" + pullUrl + ", chatUrl=" + chatUrl
				+ ", domain=" + domain + ", httpSessionId=" + httpSessionId
				+ ", chatSessionId=" + chatSessionId + "]";
	}
	
}
